package org.ecbox.sdk.internal.stream.message;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;

import org.ecbox.sdk.internal.stream.NamedThreadFactory;

/**
 * 消息处理工厂是一个线程池
 * 
 * File: StreamMsgConsumeFactory.java<br/>
 * Description: <br/>
 *
 * Copyright: Copyright (c) 2012 ecbox.com<br/>
 * Company: ECBOX,Inc.<br/>
 *
 * @author chenxiaochun
 * @date 2013-1-5
 * @version 1.0
 */
public class StreamMsgConsumeFactory {

	private int minThreads;
	private int maxThreads;
	private int queueSize;
	private ThreadPoolExecutor threadPool;

	// 标示是否关闭
	private boolean isShutDown = false;

	public StreamMsgConsumeFactory(int minThreads, int maxThreads, int queueSize, RejectedExecutionHandler rejectHandler) {
		if (minThreads <= 0 || maxThreads <= 0 || queueSize <= 0) {
			throw new RuntimeException("minThread,maxThread and queueSize must large than 0");
		}
		this.minThreads = minThreads;
		this.maxThreads = maxThreads;
		this.queueSize = queueSize;

		threadPool = new ThreadPoolExecutor(this.minThreads, this.maxThreads, 60, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(this.queueSize), new NamedThreadFactory("pool-msg-consume", true),
				rejectHandler == null ? new AbortPolicy() : rejectHandler);
	}

	public void consume(Runnable task) throws RejectedExecutionException, NullPointerException {
		if (!isShutDown) {
			threadPool.execute(task);
		}
	}

	public void shutdown() {
		isShutDown = true;
		try {
			threadPool.shutdown();
		} catch (Exception e) {
			// 关闭的过程中抛出的异常忽略
		}
	}
}
